Python filterを業務で活用する実装パターン|実務データ処理の完全ガイド

未分類

Python filterを業務で活用する実装パターン|実務データ処理の完全ガイド

1. filter関数の基本的な仕組み

Pythonのfilter()関数は、イテラブルオブジェクトの要素を条件に基づいてフィルタリングする組み込み関数です。基本的な構文は以下の通りです。

filter(function, iterable)

第1引数に判定関数、第2引数にリストなどのイテラブルを指定します。判定関数がTrueを返した要素のみを含む新しいイテレータが返されます。

教科書的な例では以下のようになります。

# シンプルな例
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 偶数のみを抽出
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
print(even_numbers)  # [2, 4, 6, 8, 10]

しかし、業務レベルではこのような単純な用途では使用されません。実際の業務では、より複雑なデータ構造、外部データベースとの連携、エラーハンドリング、パフォーマンス最適化などを考慮する必要があります。

2. 業務でのユースケース

Pythonのfilter関数が活躍する業務シーンをいくつか紹介します。

2.1 ユーザーデータの抽出

顧客管理システムから特定の条件を満たすユーザーを抽出する場合、filter関数が重宝します。例えば、アクティブなユーザーのみを取得したい場合です。

2.2 ログファイルの処理

アプリケーションログから特定レベルのエラーのみを抽出する際に活用できます。大量のログファイルの中からエラーだけを取り出す作業は頻繁に発生します。

2.3 APIレスポンスの加工

外部APIから取得したJSONデータから不要な項目を除外する際に利用できます。レスポンスサイズを削減し、ダウンストリームの処理を効率化できます。

2.4 データベースクエリ結果の後処理

SQLで取得したデータを、Pythonアプリケーション層でさらに絞り込む場合があります。リアルタイムの計算結果に基づいた抽出などが該当します。

3. 実装コード集

3.1 ユーザー管理システムの実装例

最初の実務例として、ユーザー管理システムを想定したコードを示します。

from datetime import datetime, timedelta
from typing import List, Dict, Callable
from dataclasses import dataclass

@dataclass
class User:
    user_id: int
    name: str
    email: str
    status: str  # 'active', 'inactive', 'suspended'
    created_at: datetime
    last_login: datetime
    subscription_tier: str  # 'free', 'pro', 'enterprise'

class UserFilterManager:
    def __init__(self, users: List[User]):
        self.users = users
        self.filters: List[Callable[[User], bool]] = []
    
    def add_status_filter(self, status: str) -> 'UserFilterManager':
        \"\"\"ステータスでフィルタリング\"\"\"
        self.filters.append(lambda u: u.status == status)
        return self
    
    def add_active_recently_filter(self, days: int = 30) -> 'UserFilterManager':
        \"\"\"直近N日間のアクティブユーザーをフィルタリング\"\"\"
        cutoff_date = datetime.now() - timedelta(days=days)
        self.filters.append(lambda u: u.last_login > cutoff_date)
        return self
    
    def add_subscription_filter(self, tier: str) -> 'UserFilterManager':
        \"\"\"サブスクリプション層でフィルタリング\"\"\"
        self.filters.append(lambda u: u.subscription_tier == tier)
        return self
    
    def apply_filters(self) -> List[User]:
        \"\"\"すべてのフィルターを適用\"\"\"
        result = self.users
        for filter_func in self.filters:
            result = list(filter(filter_func, result))
        return result

# 使用例
users = [
    User(1, 'Alice', 'alice@example.com', 'active', 
         datetime(2024, 1, 1), datetime(2024, 1, 15), 'pro'),
    User(2, 'Bob', 'bob@example.com', 'inactive',
         datetime(2023, 6, 1), datetime(2023, 12, 1), 'free'),
    User(3, 'Charlie', 'charlie@example.com', 'active',
         datetime(2024, 1, 5), datetime(2024, 1, 16), 'enterprise'),
    User(4, 'Diana', 'diana@example.com', 'suspended',
         datetime(2023, 11, 1), datetime(2023, 12, 15), 'pro'),
]

# アクティブで直近30日間にログインしているProユーザーを取得
filtered_users = (UserFilterManager(users)
                  .add_status_filter('active')
                  .add_active_recently_filter(30)
                  .add_subscription_filter('pro')
                  .apply_filters())

print(f\"Filtered users: {len(filtered_users)}\")
for user in filtered_users:
    print(f\"  - {user.name} ({user.email})\")

3.2 ログファイル処理の実装例

実務でよく出現するログファイル解析のコードです。

import re
from enum import Enum
from datetime import datetime
from typing import List, Optional

class LogLevel(Enum):
    DEBUG = 0
    INFO = 1
    WARNING = 2
    ERROR = 3
    CRITICAL = 4

class LogEntry:
    def __init__(self, line: str):
        # ログ形式: [2024-01-16 10:30:45] [ERROR] message
        self.raw_line = line
        self.parse()
    
    def parse(self):
        pattern = r'\\[(.*?)\\] \\[(.*?)\\] (.*)'
        match = re.match(pattern, self.raw_line)
        if match:
            self.timestamp = datetime.fromisoformat(match.group(1))
            self.level = match.group(2)
            self.message = match.group(3)
        else:
            self.timestamp = None
            self.level = 'UNKNOWN'
            self.message = self.raw_line
    
    def is_error_or_critical(self) -> bool:
        return self.level in ['ERROR', 'CRITICAL']
    
    def contains_keyword(self, keyword: str) -> bool:
        return keyword.lower() in self.message.lower()

class LogProcessor:
    def __init__(self, log_file_path: str):
        self.log_file_path = log_file_path
        self.entries: List[LogEntry] = []
        self._load_logs()
    
    def _load_logs(self):
        \"\"\"ログファイルを読み込み\"\"\"
        try:
            with open(self.log_file_path, 'r', encoding='utf-8') as f:
                self.entries = [LogEntry(line.strip()) for line in f 
                               if line.strip()]
        except FileNotFoundError:
            print(f\"ログファイルが見つかりません: {self.log_file_path}\")
            self.entries = []
    
    def get_errors(self) -> List[LogEntry]:
        \"\"\"エラー以上のログを取得\"\"\"
        return list(filter(lambda e: e.is_error_or_critical(), self.entries))
    
    def get_logs_with_keyword(self, keyword: str) -> List[LogEntry]:
        \"\"\"特定キーワードを含むログを取得\"\"\"
        return list(filter(lambda e: e.contains_keyword(keyword), self.entries))
    
    def get_errors_with_keyword(self, keyword: str) -> List[LogEntry]:
        \"\"\"エラーレベルで特定キーワードを含むログを取得\"\"\"
        error_logs = self.get_errors()
        return list(filter(lambda e: e.contains_keyword(keyword), error_logs))
    
    def export_critical_logs(self, output_path: str):
        \"\"\"クリティカルログのみを抽出して別ファイルに出力\"\"\"
        critical_logs = list(filter(
            lambda e: e.level == 'CRITICAL', 
            self.entries
        ))
        with open(output_path, 'w', encoding='utf-8') as f:
            for log in critical_logs:
                f.write(log.raw_line + '\\n')
        return len(critical_logs)

# 使用例
if __name__ == '__main__':
    processor = LogProcessor('app.log')
    
    # エラーログを取得
    errors = processor.get_errors()
    print(f\"エラーログ数: {len(errors)}\")
    
    # \"database\"キーワードを含むエラーを取得
    db_errors = processor.get_errors_with_keyword('database')
    print(f\"Database関連のエラー: {len(db_errors)}\")
    
    # クリティカルログをエクスポート
    count = processor.export_critical_logs('critical.log')
    print(f\"クリティカルログ: {count}件をexport\")

3.3 APIレスポンス加工の実装例

外部APIからのデータを加工する実務パターンです。

import json
import requests
from typing import List, Dict, Any, Optional
from urllib.parse import urljoin

class APIDataProcessor:
    def __init__(self, api_base_url: str):
        self.api_base_url = api_base_url
    
    def fetch_products(self, category: Optional[str] = None) -> List[Dict]:
        \"\"\"商品データを取得\"\"\"
        url = urljoin(self.api_base_url, '/api/products')
        params = {'category': category} if category else {}
        
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json().get('products', [])
        except requests.RequestException as e:
            print(f\"APIエラー: {e}\")
            return []
    
    def filter_available_products(self, products: List[Dict]) -> List[Dict]:
        \"\"\"利用可能な商品のみを抽出\"\"\"
        return list(filter(
            lambda p: p.get('in_stock') and p.get('price', 0) > 0,
            products
        ))
    
    def filter_by_price_range(self, products: List[Dict], 
                             min_price: float, max_price: float) -> List[Dict]:
        \"\"\"価格範囲でフィルタリング\"\"\"
        return list(filter(
            lambda p: min_price <= p.get('price', 0) <= max_price,
            products
        ))
    
    def filter_high_rating_products(self, products: List[Dict], 
                                   min_rating: float = 4.0) -> List[Dict]:
        \"\"\"評価でフィルタリング\"\"\"
        return list(filter(
            lambda p: p.get('rating', 0) >= min_rating and 
                      p.get('review_count', 0) >= 10,
            products
        ))
    
    def get_featured_products(self, products: List[Dict]) -> List[Dict]:
        \"\"\"フィーチャー商品を抽出(複数条件)\"\"\"
        available = self.filter_available_products(products)
        featured = list(filter(
            lambda p: p.get('featured') or 
                      (p.get('rating', 0) >= 4.5 and 
                       p.get('review_count', 0) >= 50),
            available
        ))
        return featured[:10]  # 最大10件返却
    
    def remove_sensitive_fields(self, products: List[Dict]) -> List[Dict]:
        \"\"\"機密情報を削除してクライアント向けにサニタイズ\"\"\"
        sensitive_fields = ['supplier_id', 'cost_price', 'internal_notes']
        
        def sanitize(product: Dict) -> Dict:
            return {k: v for k, v in product.items() 
                   if k not in sensitive_fields}
        
        # フィルター後にサニタイズ
        available = self.filter_available_products(products)
        return [sanitize(p) for p in available]

# 実務での使用パターン
class ECommerceService:
    def __init__(self, api_processor: APIDataProcessor):
        self.processor = api_processor
    
    def get_recommended_products(self, budget: float) -> List[Dict]:
        \"\"\"推奨商品を取得(予算内で高評価)\"\"\"
        products = self.processor.fetch_products()
        
        # 予算内かつ在庫ありのみ
        affordable = self.processor.filter_by_price_range(
            products, 0, budget
        )
        
        # その中から高評価のみ
        recommended = self.processor.filter_high_rating_products(affordable)
        
        return recommended
    
    def get_trending_products(self) -> List[Dict]:
        \"\"\"トレンド商品を取得\"\"\"
        products = self.processor.fetch_products()
        
        # 機密情報削除
        sanitized = self.processor.remove_sensitive_fields(products)
        
        # フィーチャー商品を取得
        featured = self.processor.get_featured_products(products)
        
        return featured

# 使用例
if __name__ == '__main__':
    processor = APIDataProcessor('https://api.example.com')
    service = ECommerceService(processor)
    
    # 予算5000円以下の推奨商品
    # recommended = service.get_recommended_products(5000)
    # print(f\"推奨商品: {len(recommended)}件\")

3.4 データベース結果の後処理

SQLの結果をPythonで加工する実務パターンです。

from datetime import datetime, timedelta
from typing import List, Dict, Any
import sqlite3

class OrderFilterService:
    def __init__(self, db_path: str):
        self.db_path = db_path
    
    def get_orders_from_db(self) -> List[Dict]:
        \"\"\"データベースから注文データを取得\"\"\"
        try:
            conn = sqlite3.connect(self.db_path)
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            
            cursor.execute('''
                SELECT order_id, customer_id, order_date, total_amount, 
                       status, payment_method
                FROM orders
                ORDER BY order_date DESC
            ''')
            
            orders = [dict(row) for row in cursor.fetchall()]
            conn.close()
            return orders
        except sqlite3.Error as e:
            print(f\"DB エラー: {e}\")
            return []
    
    def filter_high_value_orders(self, orders: List[Dict], 
                                threshold: float = 10000) -> List[Dict]:
        \"\"\"高額注文をフィルタリング\"\"\"
        return list(filter(
            lambda o: o['total_amount'] >= threshold,
            orders
        ))
    
    def filter_pending_orders(self, orders: List[Dict]) -> List[Dict]:
        \"\"\"未処理注文をフィルタリング\"\"\"
        return list(filter(
            lambda o: o['status'] in ['pending', 'processing'],
            orders
        ))
    
    def filter_recent_orders(self, orders: List[Dict], 
                            days: int = 7) -> List[Dict]:
        \"\"\"N日以内の注文をフィルタリング\"\"\"
        cutoff_date = datetime.now() - timedelta(days=days)
        
        return list(filter(
            lambda o: datetime.fromisoformat(o['order_date']) > cutoff_date,
            orders
        ))
    
    def filter_card_payment_orders(self, orders: List[Dict]) -> List[Dict]:
        \"\"\"カード決済のみをフィルタリング\"\"\"
        return list(filter(
            lambda o: o['payment_method'] in ['credit_card', 'debit_card'],
            orders
        ))
    
    def get_urgent_orders(self) -> List[Dict]:
        \"\"\"緊急処理が必要な注文を取得\"\"\"
        all_orders = self.get_orders_from_db()
        
        # 未処理かつ高額な注文
        urgent = list(filter(
            lambda o: o['status'] in ['pending', 'processing'] and 
                      o['total_amount'] >= 50000,
            all_orders
        ))
        
        return urgent

# 使用例
if __name__ == '__main__':
    service = OrderFilterService('orders.db')
    orders = service.get_orders_from_db()
    
    # 未処理の高額注文
    pending_high = (OrderFilterService(service.db_path)
                    .filter_pending_orders(
                        service.filter_high_value_orders(orders, 50000)
                    ))
    
    print(f\"緊急処理対象: {len(pending_high)}件\")

4. よくある応用パターン

4.1 複数条件を組み合わせたフィルタリング

from functools import reduce
from typing import List, Callable, TypeVar

T = TypeVar('T')

class MultiFilterBuilder:
    \"\"\"複数のフィルター条件を組み合わせるビルダーパターン\"\"\"
    
    def __init__(self, items: List[T]):
        self.items = items
        self.conditions: List[Callable[[T], bool]] = []
    
    def add_condition(self, condition: Callable[[T], bool]) -> 'MultiFilterBuilder':
        \"\"\"条件を追加 (AND条件)\"\"\"
        self.conditions.append(condition)
        return self
    
    def apply(self) -> List[T]:
        \"\"\"すべての条件を適用\"\"\"
        def combined_filter(item: T) -> bool:
            return all(condition(item) for condition in self.conditions)
        
        return list(filter(combined_filter, self.items))

# 使用例
class Product:
    def __init__(self, name: str, price: float, category: str, 
                 in_stock: bool, rating: float):
        self.name = name
        self.price = price
        self.category = category
        self.in_stock = in_stock
        self.rating = rating

products = [
    Product('Laptop', 150000, 'Electronics', True, 4.5),
    Product('Mouse', 3000, 'Electronics', True, 4.0),
    Product('Desk', 50000, 'Furniture', False, 3.8),
    Product('Chair', 45000, 'Furniture', True, 4.2),
]

# 在庫ありかつ4.0以上の評価かつ価格10000以上
filtered = (MultiFilterBuilder(products)
            .add_condition(lambda p: p.in_stock)
            .add_condition(lambda p: p.rating >= 4.0)
            .add_condition(lambda p: p.price >= 10000)
            .apply())

for product in filtered:
    print(f\"{product.name}: ¥{product.price} ({product.rating})\")

4.2 フィルタリング結果の変換(map + filter)

from typing import List, Dict

class DataTransformer:
    \"\"\"フィルタリング後にデータを変換\"\"\"
    
    @staticmethod
    def extract_active_users_emails(users: List[Dict]) -> List[str]:
        \"\"\"アクティブユーザーのメールアドレスのみを抽出\"\"\"
        active_users = filter(lambda u: u['status'] == 'active', users)
        return list(map(lambda u: u['email'], active_users))
    
    @staticmethod
    def get_high_value_customer_summary(orders: List[Dict]) -> List[Dict]:
        \"\"\"高額顧客の概要を取得\"\"\"
        high_value = filter(lambda o: o['total'] >= 100000, orders)
        return list(map(
            lambda o: {
                'customer_id': o['customer_id'],
                'total': o['total'],
                'discount': int(o['total'] * 0.1)
            },
            high_value
        ))

# 使用例
users = [
    {'id': 1, 'email': 'user1@example.com', 'status': 'active'},
    {'id': 2, 'email': 'user2@example.com', 'status': 'inactive'},
    {'id': 3, 'email': 'user3@example.com', 'status': 'active'},
]

active_emails = DataTransformer.extract_active_users_emails(users)
print(active_emails)  # ['user1@example.com', 'user3@example.com']

4.3 パフォーマンスを考慮したフィルタリング

import time
from typing import List, Iterator, Callable

class EfficientFilter:
    \"\"\"大規模データセット向けの効率的なフィルタリング\"\"\"
    
    @staticmethod
    def lazy_filter(items: List[int], condition: Callable) -> Iterator[int]:
        \"\"\"ジェネレータを使用した遅延フィルタリング\"\"\"
        for item in items:
            if condition(item):
                yield item
    
    @staticmethod
    def batch_filter(items: List[int], batch_size: int = 1000,
                    condition: Callable = lambda x: x % 2 == 0) -> List[int]:
        \"\"\"バッチ処理によるフィルタリング\"\"\"
        result = []
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            filtered_batch = filter(condition, batch)
            result.extend(filtered_batch)
        return result

# ベンチマーク
if __name__ == '__main__':
    data = list(range(1000000))
    
    # 方法1: listで一度に処理
    start = time.time()
    result1 = list(filter(lambda x: x % 2 == 0, data))
    time1 = time.time() - start
    print(f\"filter + list: {time1:.4f}秒\")
    
    # 方法2: ジェネレータで遅延処理
    start = time.time()
    result2 = list(EfficientFilter.lazy_filter(data, lambda x: x % 2 == 0))
    time2 = time.time() - start
    print(f\"lazy_filter: {time2:.4f}秒\")
    
    # 方法3: リスト内包表記
    start = time.time()
    result3 = [x for x in data if x % 2 == 0]
    time3 = time.time() - start
    print(f\"リスト内包表記: {time3:.4f}秒\")

5. 注意点と最適化

5.1 filterと他の方法の使い分け

Python業務では、filterの代わりにリスト内包表記を使うことが推奨される場合が多いです。理由を説明します。

# ❌ filterの使用
result1 = list(filter(lambda x: x > 5, range(100)))

# ✅ リスト内包表記の使用(推奨)
result2 = [x for x in range(100) if x > 5]

# 複雑な条件ではfilterも有効
users = [...]
result3 = list(filter(
    lambda u: u.status == 'active' and 
              u.created_at > cutoff_date and
              u.subscription_tier == 'pro',
    users
))

# ただしリスト内包表記でも記述可能
result4 = [u for u in users 
          if u.status == 'active' and
          u.created_at > cutoff_date and
          u.subscription_tier == 'pro']

業務では、可読性を最優先とすべきです。リスト内包表記の方が一般的にPythonコミュニティでも推奨されています。

5.2 エラーハンドリング

from typing import List, Dict, Optional

class SafeFilter:
    \"\"\"エラーハンドリング付きのフィルタリング\"\"\"
    
    @staticmethod
    def safe_filter(items: List[Dict], 
                   key: str, 
                   value: Any) -> List[Dict]:
        \"\"\"キーが存在しない場合も安全に処理\"\"\"
        return list(filter(
            lambda item: item.get(key) == value,
            items
        ))
    
    @staticmethod
    def filter_with_validation(items: List[Dict]) -> List[Dict]:
        \"\"\"データ型を検証してからフィルタリング\"\"\"
        def is_valid(item: Dict) -> bool:
            try:
                return (isinstance(item.get('price'), (int, float)) and
                       item['price'] > 0 and
                       isinstance(item.get('name'), str) and
                       len(item['name']) > 0)
            except (KeyError, TypeError):
                return False
        
        return list(filter(is_valid, items))

# 使用例
products = [
    {'name': 'Product A', 'price': 1000},
    {'name': 'Product B', 'price': -100},  # 無効な価格
    {'name': '', 'price': 2000},  # 無効な名前
    {'name': 'Product C', 'price': 3000},
]

valid_products = SafeFilter.filter_with_validation(products)
print(f\"有効な商品: {len(valid_products)}件\")

5.3 大規模データでのメモリ効率

from typing import Iterator, List
import sys

class MemoryEfficientProcessing:
\"\"\"大規模CSVファイルの処理例\"\"\"

@staticmethod
def process_large_csv(file_path: str) -> Iterator[Dict]:
\"\"\"CSVファイルを行単位で処理(メモリ効率的)\"\"\"
import csv

with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)

# ジェネレータを使用して1行ずつ処理
for row in reader:
if row.get('status') == 'active':
yield row

@staticmethod
def filter_and_aggregate(file_path: str) -> Dict:
\"\"\"フィルタリングと集計を同時に実行\"\"\

タイトルとURLをコピーしました